package com.sanq.product.books.kafka.producer;

import com.sanq.product.books.kafka.producer.listeners.BooksTopicListener;
import com.sanq.product.books.kafka.producer.listeners.NoticeTopicListener;
import com.sanq.product.books.kafka.producer.listeners.SplideTopicListener;
import com.sanq.product.books.kafka.producer.listeners.VisiLogTopicListener;
import com.sanq.product.config.utils.web.LogUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.listener.MessageListener;

import javax.annotation.Resource;

/**
 * KafkaMessageListenerImpl
 *
 * @author sanq.Yan
 * @date 2019/8/6
 */
public class KafkaMessageListenerImpl implements MessageListener<String, String> {

    @Value("${kafka.books.del}")
    public String kafkaBooksDelTopic;

    @Value("${kafka.notice.reply}")
    public String kafkaNoticeReplyTopic;

    @Value("${kafka.chapter.del}")
    public String kafkaChapterDelTopic;

    @Value("${kafka.books.search}")
    public String kafkaBooksSearch;

    @Value("${kafka.visi.log}")
    public String kafkaVisiLog;

    @Value("${kafka.common.topic}")
    public String kafkaCommonTopic;

    @Resource
    private BooksTopicListener booksTopicListener;
    @Resource
    private NoticeTopicListener noticeTopicListener;
    @Resource
    private VisiLogTopicListener visiLogTopicListener;
    @Resource
    private SplideTopicListener splideTopicListener;

    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        LogUtil.getInstance(getClass()).i(String.format("%s:::%s", data.topic(), data.value()));

        if (kafkaBooksDelTopic.equals(data.topic())) {
            //小说删除
            booksTopicListener.deleteBooks(data.value());

        } else if (kafkaChapterDelTopic.equals(data.topic())) {
            // 章节删除
            booksTopicListener.deleteChapter(data.value());

        } else if (kafkaNoticeReplyTopic.equals(data.topic())) {
            //反馈回复
            noticeTopicListener.reply(data.value());

        } else if (kafkaBooksSearch.equals(data.topic())) {
            //添加到ES
            booksTopicListener.save2Es(data.value(), data.key());

        } else if (kafkaVisiLog.equals(data.topic())) {
            //访问日志
            visiLogTopicListener.saveLog(data.value());

        } else if (kafkaCommonTopic.equals(data.topic())) {

            splideTopicListener.saveData2DB(data.value(), data.key());

        }

    }
}
